雖然已經到尾聲了,但我發現有個重要的 Airflow 元件沒有提到,所以必須來補這篇。
Airflow XCom 是一種在不同的 task 之間傳遞資料的機制,它的全名是 cross-communication。Airflow XCom 的觀念是,每個 task 都可以產生一個或多個 key-value pair 的資料,並將它們儲存到 metadata database 中。這些資料可以被其他的 task 讀取或使用,以實現 task 之間的資料交換或依賴。
Airflow XCom 的優點是,它可以讓 task 之間的邏輯更清晰,並減少重複的程式碼。Airflow XCom 的缺點是,它會增加 metadata database 的負擔,並可能影響 Airflow 的效能。因為每次傳遞資料時,都需要將資料序列化並儲存到資料庫中。這樣會增加資料庫的負擔,並且延長 task 的執行時間。
因此,建議只使用 Airflow XCom 傳遞少量且重要的資料,例如參數或狀態,或是某些路徑。
task1 產生了一批資料,我們有三種選擇:
優點:沒有任何的資源浪費,所有資料都在記憶體內流轉,沒用到 XCom。
缺點:沒有妥善利用 Airflow 提供的 Task 功能,也沒辦法做 pipeline 的排程優化。
如果資料後續的轉換是複雜的,像是還要再跟其他資料 join 的話,不建議這樣做。但如果足夠單純,是可以考慮的。
不建議寫到 local disk 是因為如果你的 Airflow 大到要用分散式架構,每個 operator 可能在不同的機器上執行,所以 local disk 的資料可能會讀取失敗。
至於其他儲存如 S3,通常你上傳之後都會得到一個路徑如 “xxx_example_path@S3”,則我們可以將這個路徑透過 XCom 傳給其他的 task,下一個 task 再去將資料載回來後處理。
def push_data(**kwargs):
data = query_from_somewhere()
path = load_to_s3(data)
# 這個函數會回傳S3路徑,Airflow 預設會將它存在 XCom 內
return path
def pull_data(**kwargs):
# 這個函數會從 XCom 中取得資料
path = kwargs['ti'].xcom_pull(task_ids='push_task')
data = query_from_s3(path)
do_something(path)
with DAG('xcom_example', start_date=datetime(2021, 1, 1), schedule_interval=None) as dag:
push_task = PythonOperator(
task_id='push_task',
python_callable=push_data
)
pull_task = PythonOperator(
task_id='pull_task',
python_callable=pull_data
)
push_task >> pull_task
優點:傳送路徑不太可能遇到資料過多的狀況,每個 task 也沒什麼相依性,就看資料是否存在。
缺點:有點脫褲子放屁,每次需要資料時都要去某個地方重新載入。如果資料真的很大,也不見得多省時省資源。
考慮網路頻寬跟流量,如果你的資料很大,Airflow 又不是在 AWS 或 GCP 上的話,可以考慮架一個 HDFS 來存放。如果在雲上,又願意捨得花流量費的話,那就用它們的倉儲吧。
跟選擇二的程式類似,只是寫入的是資料本身。要注意如果是自訂 class,需要先實作序列化跟反序列化,才能直接將該物化寫入 XCom。
另外,依你的 metadata database 而定,有不同的傳輸上限
PostgreSQL: 1 GB
MySQL: 64KB
sqlite: 2 GB
如果你沒有任何額外設定,預設是 48KB。所以大量資料就會很容易炸掉。
優點:資料直接傳輸,最直覺。
缺點:大量資料有可能超過 database 上限,導致執行失敗